from __future__ import annotations

import contextlib
import functools
import hashlib
import json
import os
import re
from collections.abc import Sequence

from pre_commit import lang_base
from pre_commit.prefix import Prefix
from pre_commit.util import CalledProcessError
from pre_commit.util import cmd_output_b

ENVIRONMENT_DIR = 'docker'
PRE_COMMIT_LABEL = 'PRE_COMMIT'
get_default_version = lang_base.basic_get_default_version
health_check = lang_base.basic_health_check
in_env = lang_base.no_env  # no special environment for docker

_HOSTNAME_MOUNT_RE = re.compile(
    rb"""
    /containers
    (?:/overlay-containers)?
    /([a-z0-9]{64})
    (?:/userdata)?
    /hostname
    """,
    re.VERBOSE,
)


def _get_container_id() -> str | None:
    with contextlib.suppress(FileNotFoundError):
        with open('/proc/1/mountinfo', 'rb') as f:
            for line in f:
                m = _HOSTNAME_MOUNT_RE.search(line)
                if m:
                    return m[1].decode()

    return None


def _get_docker_path(path: str) -> str:
    container_id = _get_container_id()
    if container_id is None:
        return path

    try:
        _, out, _ = cmd_output_b('docker', 'inspect', container_id)
    except CalledProcessError:
        # self-container was not visible from here (perhaps docker-in-docker)
        return path

    container, = json.loads(out)
    for mount in container['Mounts']:
        src_path = mount['Source']
        to_path = mount['Destination']
        if os.path.commonpath((path, to_path)) == to_path:
            # So there is something in common,
            # and we can proceed remapping it
            return path.replace(to_path, src_path)
    # we're in Docker, but the path is not mounted, cannot really do anything,
    # so fall back to original path
    return path


def md5(s: str) -> str:  # pragma: win32 no cover
    return hashlib.md5(s.encode()).hexdigest()


def docker_tag(prefix: Prefix) -> str:  # pragma: win32 no cover
    md5sum = md5(os.path.basename(prefix.prefix_dir)).lower()
    return f'pre-commit-{md5sum}'


def build_docker_image(
        prefix: Prefix,
        *,
        pull: bool,
) -> None:  # pragma: win32 no cover
    cmd: tuple[str, ...] = (
        'docker', 'build',
        '--tag', docker_tag(prefix),
        '--label', PRE_COMMIT_LABEL,
    )
    if pull:
        cmd += ('--pull',)
    # This must come last for old versions of docker.  See #477
    cmd += ('.',)
    lang_base.setup_cmd(prefix, cmd)


def install_environment(
        prefix: Prefix, version: str, additional_dependencies: Sequence[str],
) -> None:  # pragma: win32 no cover
    lang_base.assert_version_default('docker', version)
    lang_base.assert_no_additional_deps('docker', additional_dependencies)

    directory = lang_base.environment_dir(prefix, ENVIRONMENT_DIR, version)

    # Docker doesn't really have relevant disk environment, but pre-commit
    # still needs to cleanup its state files on failure
    build_docker_image(prefix, pull=True)
    os.mkdir(directory)


@functools.lru_cache(maxsize=1)
def _is_rootless() -> bool:  # pragma: win32 no cover
    retcode, out, _ = cmd_output_b(
        'docker', 'system', 'info', '--format', '{{ json . }}',
    )
    if retcode != 0:
        return False

    info = json.loads(out)
    try:
        return (
            # docker:
            # https://docs.docker.com/reference/api/engine/version/v1.48/#tag/System/operation/SystemInfo
            'name=rootless' in (info.get('SecurityOptions') or ()) or
            # podman:
            # https://docs.podman.io/en/latest/_static/api.html?version=v5.4#tag/system/operation/SystemInfoLibpod
            info['host']['security']['rootless']
        )
    except KeyError:
        return False


def get_docker_user() -> tuple[str, ...]:  # pragma: win32 no cover
    if _is_rootless():
        return ()

    try:
        return ('-u', f'{os.getuid()}:{os.getgid()}')
    except AttributeError:
        return ()


def get_docker_tty(*, color: bool) -> tuple[str, ...]:  # pragma: win32 no cover  # noqa: E501
    return (('--tty',) if color else ())


def docker_cmd(*, color: bool) -> tuple[str, ...]:  # pragma: win32 no cover
    return (
        'docker', 'run',
        '--rm',
        *get_docker_tty(color=color),
        *get_docker_user(),
        # https://docs.docker.com/engine/reference/commandline/run/#mount-volumes-from-container-volumes-from
        # The `Z` option tells Docker to label the content with a private
        # unshared label. Only the current container can use a private volume.
        '-v', f'{_get_docker_path(os.getcwd())}:/src:rw,Z',
        '--workdir', '/src',
    )


def run_hook(
        prefix: Prefix,
        entry: str,
        args: Sequence[str],
        file_args: Sequence[str],
        *,
        is_local: bool,
        require_serial: bool,
        color: bool,
) -> tuple[int, bytes]:  # pragma: win32 no cover
    # Rebuild the docker image in case it has gone missing, as many people do
    # automated cleanup of docker images.
    build_docker_image(prefix, pull=False)

    entry_exe, *cmd_rest = lang_base.hook_cmd(entry, args)

    entry_tag = ('--entrypoint', entry_exe, docker_tag(prefix))
    return lang_base.run_xargs(
        (*docker_cmd(color=color), *entry_tag, *cmd_rest),
        file_args,
        require_serial=require_serial,
        color=color,
    )
